package com.flink.examples.redis;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Protocol;

/**
 * @Description 从redis中读取数据输出到DataStream数据流中
 * @Author JL
 * @Date 2020/09/18
 * @Version V1.0
 */
public class DataStreamSource {
    /**
     * 官方文档：https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String key = "props";
        //实现RichSourceFunction抽象方法，加载数据源数据到流中
        DataStream<Tuple2<String, String>> dataStream = env.addSource(new RichSourceFunction<Tuple2<String, String>>(){
            private JedisPool jedisPool = null;
            @Override
            public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
                jedisPool = new JedisPool(new JedisPoolConfig(), "127.0.0.1", 6379, Protocol.DEFAULT_TIMEOUT);
                Jedis jedis = jedisPool.getResource();
                try{
                    ctx.collect(Tuple2.of(key, jedis.get(key)));
                }catch(Exception e){
                    e.printStackTrace();
                }finally{
                    if (jedis != null){
                        //用完即关，内部会做判断，如果存在数据源与池，则回滚到池中
                        jedis.close();
                    }
                }
            }
            @Override
            public void cancel() {
                try {
                    super.close();
                }catch(Exception e){
                }
                if (jedisPool != null){
                    jedisPool.close();
                    jedisPool = null;
                }
            }
        });
        dataStream.print();
        env.execute("flink redis source");
    }

}
/*
3> (props,5514)
*/